package cn.echcz.twitbase.util

import cn.echcz.twitbase.exception.RepeatException
import cn.echcz.twitbase.util.Common._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}

/**
  * Hbase工具类
  */
object HbaseUtils {

  /**
    * 获取hbase连接
    *
    * @param setting hbase自定义配置项
    * @return hbase连接
    */
  def getConnection(setting: Map[String, String]): Connection = {
    val config = HBaseConfiguration.create()
    setting.foreach(x => config.set(x._1, x._2))
    ConnectionFactory.createConnection(config)
  }

  /**
    * 获取hbase管理对象
    *
    * @param connection hbase接连
    * @return hbase管理对象
    */
  def getAdmin(connection: Connection): Admin = {
    connection.getAdmin
  }

  /**
    * 创建hbase表
    *
    * @param admin     hbase管理对象
    * @param tableName 表名
    * @param familys   列族名
    */
  def createTable(admin: Admin, tableName: String, familys: String*): Unit = {
    if (false == admin.tableExists(TableName.valueOf(tableName))) {
      val descBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
      familys.foreach { family => {
        val familyBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
        descBuilder.setColumnFamily(familyBuilder.build())
      }
      }
      admin.createTable(descBuilder.build())
    } else {
      throw new RepeatException("表已经存在, 不能建立")
    }
  }

  /**
    * 获取hbse表
    *
    * @param connection hbase接连
    * @param tableName  表名
    * @return 表
    */
  def getTable(connection: Connection, tableName: String): Table = {
    connection.getTable(TableName.valueOf(tableName))
  }


  /**
    * 快速执行方法外壳，会自动关闭hbase接连和表
    *
    * @param connection hbase连接
    * @param table      表
    * @param code       具体执行的代码
    */
  def fastExe(connection: Connection, table: Table)(code: => Unit) {
    try {
      code
    } finally {
      table.close()
      connection.close()
    }
  }

  /**
    * 快速执行方法外壳，会自动关闭hbase接连和管理对象
    *
    * @param connection hbase连接
    * @param admin      管理对象
    * @param code       具体执行的代码
    */
  def fastExe(connection: Connection, admin: Admin)(code: => Unit) {
    try {
      code
    } finally {
      admin.close()
      connection.close()
    }
  }

  /**
    * 将某个值存hbase
    *
    * @param table     表
    * @param rowKey    行键
    * @param family    列族
    * @param qualifier 列
    * @param value     值
    */
  def put(table: Table, rowKey: String, family: String, qualifier: String, value: String): Unit = {
    val row = new Put(rowKey)
    row.addColumn(family, qualifier, value)
    table.put(row)
  }

  /**
    * 从hbase获取某个值
    *
    * @param table     表
    * @param rowKey    行键
    * @param family    列族
    * @param qualifier 列
    * @return 获取的值
    */
  def get(table: Table, rowKey: String, family: String, qualifier: String): String = {
    val row = new Get(rowKey)
    row.addColumn(family, qualifier)
    val result = table.get(row)
    Bytes.toString(result.getValue(family, qualifier))
  }

  /**
    * 删除某个单元格
    *
    * @param table     表
    * @param rowKey    行键
    * @param family    列族
    * @param qualifier 列
    * @param delAll    是否删除所有版本，默认不删除
    */
  def delete(table: Table, rowKey: String, family: String, qualifier: String, delAll: Boolean = false): Unit = {
    val row = new Delete(rowKey)
    if (delAll) {
      row.addColumns(family, qualifier)
    } else {
      row.addColumn(family, qualifier)
    }
    table.delete(row)
  }

  /**
    * 删除行
    *
    * @param table  表
    * @param rowKey 要删除的行
    */
  def deleteRow(table: Table, rowKey: String): Unit = {
    val row = new Delete(rowKey)
    table.delete(row)
  }
}
